Skip to main content

Candles Provider

Provider that supplies data about candles for the selected trading instrument. The chart listens to candlesFlow, a flow of candle data updates, and reacts to changes in it by updating the displayed information accordingly. Whenever there is a new candle data emitted by candlesFlow, the chart updates to reflect this new data, ensuring that the visual representation of the trading instrument's price movements is always current and accurate.

/**
* Interface for receiving candles data
*
* When loading dxCharts library chart, data about candles is taken from this interface
*
* When connecting dxCharts library, developer can implement this interface, or use the default implementation [com.devexperts.dxcharts.provider.candles.DxFeedCandlesProvider] and pass it to the library using [DxChartsDataProviders] data class
*
* The package *com.devexperts.dxcharts.provider.candles:dxcharts_dxfeed_candles_provider* contains an implementation of the interface for working with dxFeed
*
* Use [candlesFlow] to get candles data
*
* Use [isDataAvailable] to get state of candles data. When it is false, we show corresponding message on chart and block everything except changing symbol
*
* Use [isLoading] to get state if candles are loading. When it is true, we show pulsating animation on chart and block interactions with it
*
* Use [changeParams] to change candles params
*/
interface DxChartsCandlesProvider {
/**
* Flow of receiving candles data
*
* Candles data is represented by [DxChartsCandles]
*/
val candlesFlow: StateFlow<DxChartsCandles>
/**
* Flow of receiving state of candlesData
*/
val isDataAvailable: StateFlow<Boolean>
/**
* Flow of receiving state if candles data is loading after params changes
*/
val isLoading: StateFlow<Boolean>
/**
* Changes candles params for dxFeed
*
* @param symbol instrument symbol
* @param priceType price type of the instrument (bid, ask, last, market)
* @param aggregation aggregation of the instrument
* @param extendedHours extended hours flag
* @param alignSessionStart align session start flag
*/
fun changeParams(
symbol: String,
priceType: PriceType,
aggregation: Aggregation,
extendedHours: Boolean,
alignSessionStart: Boolean
)
}

Method: changeParams

This method is used to change the parameters for the candles data. It takes the following parameters:

  • symbol: The symbol of the selected instrument. A symbol is a unique identifier used for trading on the exchange. For example, the symbol for Apple stocks on NASDAQ is "AAPL".
  • priceType: The price type of the instrument. It can be "bid" (the price at which traders are willing to buy the asset), "ask" (the price at which traders are willing to sell the asset), "last" (the price of the last completed transaction), or "market" (the current market price).
  • aggregation: The aggregation of the instrument, the minimum time interval between candles. For example, if the aggregation is 1 minute, each candle on the chart will represent 1 minute of trading.
  • extendedHours: A flag indicating the need to obtain candles outside of trading hours. If the flag is set to true, the data will include information about trading before and after the main trading time.
  • alignSessionStart: A flag indicating the need to align candles with the start of the trading session. If the flag is set to true, the start of each candle will coincide with the start of the trading session.

Example

Here is an example of how to use the changeParams method:

candlesProvider.changeParams(
symbol = "AAPL",
priceType = PriceType.LAST,
aggregation = Aggregation(1, Aggregation.TimeUnit.MINUTES),
extendedHours = true,
alignSessionStart = false
)

Data is sent by updating state of the candlesFlow variable. Class com.devexperts.dxcharts.provider.domain.DxChartsCandles - a complete array of candles for the chart, along with their properties:

/**
* Data class for storing chart candles data and general information about its properties.
*
* @property candlesData List of [CandleDO] objects representing the candles data.
* @property aggregation [Aggregation] of the candles data.
* @property symbol Symbol of the instrument for which the candles data is provided.
* @property extendedHours Flag indicating whether the candles data includes extended hours (before and after market hours).
* @property alignedToSession Flag indicating whether the candles data is aligned to the session. If true, the candles data is aligned to the session. If false, the candles data is aligned to the beginning of the day.
* @property priceType [PriceType] of the candles data. Can be [PriceType.LAST] (price of the last completed transaction), [PriceType.BID] (price at which traders are willing to buy the asset), [PriceType.ASK] (price at which traders are willing to sell the asset), or [PriceType.MARKET] (current market price).
*/
data class DxChartsCandles(
val candlesData: List<CandleDO> = emptyList(),
val aggregation: Aggregation = Aggregation.HOUR,
val symbol: String = "",
val extendedHours: Boolean = false,
val alignedToSession: Boolean = false,
val priceType: PriceType = PriceType.LAST
)
/**
- Data class representing a single candle data object.
-
- @property high High price of the candle.
- @property low Low price of the candle.
- @property open Opening price of the candle.
- @property close Closing price of the candle.
- @property timestamp Timestamp of the candle in milliseconds.
- @property volume Volume of sales for the candle.
- @property expansion Optional expansion flag.
- @property id Optional candle id.
- @property impVolatility Optional implied volatility.
- @property vWap Optional volume weighted average price.
*/
data class CandleDO(
val high: Double,
val low: Double,
val open: Double,
val close: Double,
val timestamp: Long,
val volume: Long,
val expansion: Boolean? = null,
val id: Int? = null,
val impVolatility: Double? = null,
val vWap: Double? = null
)
/**
- Data class for chart's aggregation data.
-
- Aggregation represents the minimum time interval between candles.
-
- @property value Value of the aggregation, representing the number of time units.
- @property multiplier [TimeUnit] multiplier of the aggregation. Can be [TimeUnit.SECONDS], [TimeUnit.MINUTES], etc.
_/
data class Aggregation(
val value: Int,
val multiplier: TimeUnit = TimeUnit.SECONDS,
) {
/**
_ Enum class for storing time unit *
_ Values: [SECONDS], [MINUTES], [HOURS], [DAYS], [WEEKS], [MONTHS], [YEARS]
_/
enum class TimeUnit(val value: String, val letter: String) {
SECONDS("SECONDS", "s"),
MINUTES("MINUTES", "m"),
HOURS("HOURS", "h"),
DAYS("DAYS", "d"),
WEEKS("WEEKS", "w"),
MONTHS("MONTHS", "M"),
YEARS("YEARS", "y");
}
}

Candles set on the chart look as follows:

candles_on_chart

Here is the default implementation of DxChartsCandlesProvider:

/**
* dxCharts library Candles provider for fetching candles data using the dxFeed library.
*
* The "com.devexperts.qd:qds" library is used to connect to the dxFeed API.
*
* The process of obtaining candles is performed in a separate thread [executorService] using [endpoint] from the [connect] method:
* - Data is retrieved from [dataFlow] by subscribing to it. The data is sent as [DxChartsCandles].
*
* To start the provider, you need to call the [connect] method, which:
* - Connects to the dxFeed API at [endpointAddress].
* - Connects [feed] - the object for working with the dxFeed API, to [candlesModel] - the dxFeed API model that receives and processes candles data.
* - Connects [listener], which specifies the algorithm for processing and delivering data to dxCharts lib, to the [candlesModel] model.
* - Launches the dxFeed API within a separate thread with [executorService].
*
* When calling the [connect] method, no parameters for obtaining candles will be passed to the dxFeed API, so no data will be received.
*
* Use the [changeParams] method to pass the necessary parameters for operations. This includes setting the symbol, price type, aggregation period, and other parameters.
*
* The minimum time interval between candle transmissions to dxCharts lib is [CANDLES_DATA_UPDATE_DELAY].
*
* Candle transmission occurs through [dataFlow], which holds the current state of candles and is subscribed to within the dxCharts library.
*
* To terminate the provider, call the [disconnect] method. After calling this method:
* - [listener] is removed from the [candlesModel].
* - [endpoint] is disconnected from the dxFeed API.
* - [feed] is disconnected from [candlesModel].
* - [executorService] finishes its work and clears the thread.
*
* Errors occurring during the disconnection process will be logged and emitted through [errorFlow].
*
* @property endpoint Object for connecting to the dxFeed API.
* @property feed Object for working with the dxFeed API.
* @property executorService Service providing a separate thread for working with the dxFeed API.
* @property candlesModel dxFeed API model that receives and processes candle data.
* @property _dataFlow Private flow for transmitting candle data to dxCharts lib.
* @property dataFlow External flow for transmitting candle data to dxCharts lib.
* @property currentCandlesData Current parameters for obtaining candles.
* @property lastEmitTime Time of the last data transmission to dxCharts lib.
* @property listener Algorithm for processing and delivering data from dxFeed API to dxCharts lib.
* @property _isDataAvailable Represents the availability of data as a StateFlow. It is initially set to 'true'.
* @property isDataAvailable Provides a StateFlow to observe the availability of data.
* @property _isLoading Private flow for the state if we are in the process of loading candles data.
* @property isLoading Provides a StateFlow to observe the loading state.
* @property job Job instance for managing the coroutine responsible for checking data availability.
* @property _errorFlow Internal [MutableStateFlow] for sending errors.
* @property errorFlow [StateFlow] for sending errors.
*/
class DxFeedCandlesProvider(
private val endpointAddress: String
) : DxChartsCandlesProvider, DxChartsErrorProvider<CandlesProviderError> {
private val endpoint = DXEndpoint.getInstance()
private val feed: DXFeed = endpoint.feed
private val executorService = Executors.newFixedThreadPool(1)
private val candlesModel = TimeSeriesEventModel(Candle::class.java)
private val _dataFlow: MutableStateFlow<DxChartsCandles> = MutableStateFlow(DxChartsCandles())
override val dataFlow: StateFlow<DxChartsCandles> get() = _dataFlow
private val _errorFlow: MutableStateFlow<CandlesProviderError?> = MutableStateFlow(null)
override val errorFlow: StateFlow<CandlesProviderError?> get() = _errorFlow
private var currentCandlesData: DxChartsCandles = DxChartsCandles()
private var lastEmitTime = 0L
private val _isDataAvailable: MutableStateFlow<Boolean> = MutableStateFlow(true)
override val isDataAvailable: StateFlow<Boolean> get() = _isDataAvailable
private val _isLoading: MutableStateFlow<Boolean> = MutableStateFlow(true)
override val isLoading: StateFlow<Boolean> get() = _isLoading
private var job: Job? = null
/**
* Algorithm for processing and delivering data from dxFeed API to dxCharts lib
*
* Upon receiving new candle data, if more than [CANDLES_DATA_UPDATE_DELAY] milliseconds have passed since the last data transmission to dxCharts lib,
* the data is transmitted to dxCharts lib using [dataFlow].
*
* Candles received from dxFeed API [com.dxfeed.model.IndexedEventModel] are transformed into [CandleDO] using the [toDataObject] method.
*/
private val listener = ObservableListModelListener<Candle> {
if (System.currentTimeMillis() - CANDLES_DATA_UPDATE_DELAY >= lastEmitTime) {
val candlesData = candlesModel.eventsList.toDataObject()
startDataAvailabilityObserver(candlesData)
}
}
private fun startDataAvailabilityObserver(candlesData: List<CandleDO>) {
if (job != null) {
job?.cancel()
job = null
}
job = CoroutineScope(Dispatchers.IO).launch {
try {
if (candlesData.isEmpty()) {
_isLoading.tryEmit(true)
delay(DATA_AVAILABILITY_CHECK_DELAY)
if (candlesModel.eventsList.isEmpty()) {
updateChartData(candlesData, false)
}
} else {
updateChartData(candlesData, true)
}
} catch (e: CancellationException){
Log.d("DxChartsProviderError", "Coroutine was cancelled")
} catch (e: Exception) {
_errorFlow.tryEmit(CandlesProviderError.UnknownError("Unknown error while observing candles availability: $e.message"))
}
}
}
/**
* Connection to the dxFeed API
* - Connects [endpoint] to the dxFeed API at [endpointAddress].
* - Connects [feed] to [candlesModel].
* - Connects [listener] to [candlesModel].
* - Launches [endpoint] within a separate thread [executorService].
*/
fun connect() {
try {
_errorFlow.tryEmit(null)
endpoint.connect(endpointAddress)
candlesModel.attach(feed)
startDataAvailabilityObserver(emptyList())
candlesModel.eventsList.addListener(listener)
executorService.execute {
try {
endpoint.awaitNotConnected()
} catch (e: Exception) {
_errorFlow.tryEmit(CandlesProviderError.NetworkError("Connection to candle provider was interrupted: $e.message", e))
}
}
} catch (e: Exception) {
_errorFlow.tryEmit(CandlesProviderError.UnknownError("Unknown error during connecting to candle provider: $e.message", e))
}
}
/**
* Changing parameters for obtaining candles
*
* - Sets the instrument name [CandleSymbol], as well as the price type [CandlePrice] and candle period [CandlePeriod].
* - Sets flags [CandleSession] and [CandleAlignment] based on the passed parameters.
* - Sets the current parameters for obtaining candles in [currentCandlesData].
*
* @param symbol Instrument name for which to obtain candles
* @param priceType Price type by which to obtain candles
* @param aggregation Time interval between candles on the chart
* @param extendedHours Flag indicating the need to obtain candles outside of trading hours
* @param alignSessionStart Flag indicating the need to align candles with the start of the trading session
*/
@Synchronized
override fun changeParams(
symbol: String,
priceType: com.devexperts.dxcharts.provider.domain.PriceType,
aggregation: Aggregation,
extendedHours: Boolean,
alignSessionStart: Boolean
) {
try {
candlesModel.fromTime = START_TIME
candlesModel.symbol =
CandleSymbol.valueOf(
symbol,
when (priceType) {
com.devexperts.dxcharts.provider.domain.PriceType.BID -> CandlePrice.BID
com.devexperts.dxcharts.provider.domain.PriceType.ASK -> CandlePrice.ASK
com.devexperts.dxcharts.provider.domain.PriceType.LAST -> CandlePrice.LAST
com.devexperts.dxcharts.provider.domain.PriceType.MARKET -> CandlePrice.MARK
},
aggregation.toCandlePeriod(),
if (extendedHours || !aggregation.isTHOEnabled()) CandleSession.ANY else CandleSession.REGULAR,
if (alignSessionStart) CandleAlignment.SESSION else CandleAlignment.MIDNIGHT
)
currentCandlesData = DxChartsCandles(
symbol = symbol,
aggregation = aggregation,
extendedHours = extendedHours,
alignedToSession = alignSessionStart,
priceType = priceType
)
} catch (e: IllegalArgumentException) {
_errorFlow.tryEmit(CandlesProviderError.InvalidSymbol("Invalid symbol: $symbol", e))
} catch (e: Exception) {
_errorFlow.tryEmit(CandlesProviderError.UnknownError("Unknown error after changing parameters for candle provider: $e.message", e))
}
}
/**
* Updates the chart data with the provided list of candles and data availability status.
*
* @param candlesData The list of candles to update the chart data with.
* @param isDataAvailable The status indicating whether data is available.
*/
private suspend fun updateChartData(candlesData: List<CandleDO>, isDataAvailable: Boolean) {
_dataFlow.tryEmit(
currentCandlesData.copy(
candlesData = candlesData
)
)
_isDataAvailable.tryEmit(isDataAvailable)
lastEmitTime = System.currentTimeMillis()
delay(500)
_isLoading.tryEmit(false)
}
private fun Aggregation.isTHOEnabled(): Boolean{
return when (this.multiplier){
TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS -> true
else -> false
}
}
/**
* Disconnect from the dxFeed API
* - [job] canceled and changed to null
* - [listener] is removed from the [candlesModel].
* - [endpoint] is disconnected from the dxFeed API.
* - [feed] is disconnected from [candlesModel].
* - [executorService] finishes its work and clears the thread.
* - The instrument name within [candlesModel] is set to an empty string.
*/
fun disconnect() {
try {
job?.cancel()
job = null
candlesModel.eventsList.removeListener(listener)
endpoint.disconnectAndClear()
candlesModel.detach(feed)
executorService.shutdown()
candlesModel.symbol = ""
} catch (e: Exception) {
_errorFlow.tryEmit(CandlesProviderError.UnknownError("Unknown error during disconnection of candle provider $e.message"))
}
}
/**
* @value ENDPOINT_ADDRESS URL address of the dxFeed API
* @value CANDLES_DATA_UPDATE_DELAY minimum time interval between candle transmissions to dxCharts lib
* @value DATA_AVAILABILITY_CHECK_DELAY time interval for understanding that we have no candles data
* @value START_TIME the start timestamp in milliseconds from which candle data is provided
*/
companion object {
const val CANDLES_DATA_UPDATE_DELAY = 200L
const val DATA_AVAILABILITY_CHECK_DELAY = 14500L
const val START_TIME = 0L
}
}
/**
- Conversion of [Aggregation] to [CandlePeriod]
*/
private fun Aggregation.toCandlePeriod(): CandlePeriod {
val candleType = when (this.multiplier) {
TimeUnit.SECONDS -> CandleType.SECOND
TimeUnit.MINUTES -> CandleType.MINUTE
TimeUnit.HOURS -> CandleType.HOUR
TimeUnit.DAYS -> CandleType.DAY
TimeUnit.WEEKS -> CandleType.WEEK
TimeUnit.MONTHS -> CandleType.MONTH
TimeUnit.YEARS -> CandleType.YEAR
}
return CandlePeriod.valueOf(this.value.toDouble(), candleType)
}
/**
- Conversion of [List] of [Candle] to [List] of [CandleDO]
*/
private fun List<Candle>.toDataObject() =
this.map {
it.toDataObject()
}
/**
- Conversion of [Candle] to [CandleDO]
*/
private fun Candle.toDataObject(): CandleDO = CandleDO(
high = this.high,
low = this.low,
open = this.open,
close = this.close,
timestamp = this.time,
volume = this.volume,
expansion = false,
id = null,
impVolatility = impVolatility,
vWap = vwap
)
/**
- Sealed class for representing different types of errors in the DxFeedCandlesProvider.
*/
sealed class CandlesProviderError(override val message: String, override val error: Throwable?) : ProviderError {
data class NetworkError(
override val message: String,
override val error: Throwable? = null
) : CandlesProviderError(message, error)
data class InvalidSymbol(
override val message: String,
override val error: Throwable? = null
) : CandlesProviderError(message, error)
data class UnknownError(
override val message: String,
override val error: Throwable? = null
) : CandlesProviderError(message, error)
}